【CDK】AppFlowでSalesforceからS3へデータロード

【CDK】AppFlowでSalesforceからS3へデータロード

Clock Icon2025.01.06

はじめに

データ事業本部ビッグデータチームのkasamaです。
今回はAppFlowでSalesforceからS3へデータをロードする実装をCDKで行いたいと思います。

前提

今回実現したい構成は以下になります。
sf-appflow-s3

  • Amazon AppFlowによりSalesforceからオブジェクトを取得し、S3にロードします。
  • 失敗した場合はEventBridgeのイベントトリガーでエラー通知します。

事前にSalesforceのDeveloper Editionに登録していることとします。まだの方は以下が参考になります。
https://note.com/keitafukui/n/n0b0a29e9e495

今回はSalesforceの取引先(Account)オブジェクトを対象とします。
Screenshot 2025-01-04 at 19.54.27

今回の実装コードについては、Github上に格納してあるのでご確認いただければと思います。

https://github.com/cm-yoshikikasama/blog_code/tree/main/43_appflow_sf_to_s3

43_appflow_sf_to_s3# tree
.
|-- README.md
|-- bin
|   `-- app.ts
|-- cdk.json
|-- cloudformation
|   `-- s3.yaml
|-- jest.config.js
|-- lib
|   |-- salesforce-account-flow.ts
|   `-- salesforce-appflow-stack.ts
|-- package-lock.json
|-- package.json
|-- parameter.ts
|-- sql
|   |-- cm_kasama_db.sql
|   `-- cm_kasama_salesforce_dev_db.raw_account.sql
|-- test
|   `-- 43_appflow_sf_to_s3.test.ts
`-- tsconfig.json

8 directories, 20 files

AppFlowコネクタをAWS Management Consoleから作成

事前にSalesforce上で接続アプリケーション設定をすることで、AppFlowコネクタもCDK or Cfnで実装することは可能ですが、今回はなるべく簡易的にしたいため、AWS Management Consoleから作成します。

https://www.sunnycloud.jp/column/appflowのconnectorprofileをcfnで作成する/

手動でコネクタ作成するためには、Amazon AppFlowの接続画面からコネクタにSalesforceを作成し、接続を作成をクリックします。
Screenshot 2025-01-04 at 20.25.33
接続名は任意とし、Salesforce 環境はProductionを選択します。その他はデフォルトのままで接続するをクリックします。
Screenshot 2025-01-04 at 20.39.09
次にアクセス許可の画面となるので、許可することでコネクタが作成されます。
Screenshot 2025-01-04 at 20.26.06

実装

cloudformation/s3.yaml
AWSTemplateFormatVersion: '2010-09-09'
Description: 'S3 bucket for AppFlow with necessary permissions'

Resources:
  S3SalesforceRawDataBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName:  <your-s3-bucket-name>
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true

  S3SalesforceRawDataBucketPolicy:
    Type: AWS::S3::BucketPolicy
    Properties:
      Bucket: !Ref S3SalesforceRawDataBucket
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service: appflow.amazonaws.com
            Action:
              - s3:PutObject
              - s3:GetBucketAcl
              - s3:PutObjectAcl
              - s3:AbortMultipartUpload
              - s3:ListMultipartUploadParts
              - s3:ListBucketMultipartUploads
            Resource:
              - !Sub arn:aws:s3:::${S3SalesforceRawDataBucket}
              - !Sub arn:aws:s3:::${S3SalesforceRawDataBucket}/*
            Condition:
              StringEquals:
                aws:SourceAccount: !Ref AWS::AccountId
Outputs:
  S3SalesforceRawDataBucketName:
    Value: !Ref S3SalesforceRawDataBucket
    Export:
      Name: <your-s3-bucket-name>

データを格納するS3 Bucketをyamlファイルで定義しています。CDKで定義しても良かったですが、簡単に定義できることを考慮し、yamlにしています。

bin/app.ts
#!/usr/bin/env node
import * as cdk from "aws-cdk-lib";
import { AppFlowStack } from "../lib/salesforce-appflow-stack";
import { devParameter, prodParameter } from "../parameter";

const app = new cdk.App();

const envKey = app.node.tryGetContext("environment") ?? "dev"; // default: dev

let parameter;

if (envKey === "dev") {
  parameter = devParameter;
} else {
  parameter = prodParameter;
}

new AppFlowStack(app, `CMKasamaAppFlow${envKey.toUpperCase()}`, {
  description: `${parameter.projectName}-${parameter.envName}-test-tag`,
  env: {
    account: parameter.env?.account || process.env.CDK_DEFAULT_ACCOUNT,
    region: parameter.env?.region || process.env.CDK_DEFAULT_REGION,
  },
  tags: {
    Repository: `${parameter.projectName}-${parameter.envName}-test-tag`,
    Environment: parameter.envName,
  },

  projectName: parameter.projectName,
  envName: parameter.envName,
});

app.tsではスタックを定義しています。環境変数はparameter.tsから参照しています。

lib/salesforce-account-flow.ts
import * as appflow from "aws-cdk-lib/aws-appflow";
import * as events from "aws-cdk-lib/aws-events";
import * as targets from "aws-cdk-lib/aws-events-targets";
import type * as s3 from "aws-cdk-lib/aws-s3";
import type * as sns from "aws-cdk-lib/aws-sns";
import { Construct } from "constructs";

// フロー設定
const FLOW_CONFIG = {
  objectName: "Account",
  flowStatus: "Suspended",
  schedule: {
    startTime: "2099-01-01T00:00:00+09:00",
    expression: "rate(1days)",
    offset: 0,
  },
  s3Prefix: "sf-account-flow",
};
export interface SalesforceFlowProps {
  envName: string;
  projectName: string;
  outDataBucket: s3.IBucket;
  salesforceConnectorProfile: string;
  errorNotificationTopic: sns.ITopic;
}

export class SalesforceAccountFlow extends Construct {
  constructor(scope: Construct, id: string, props: SalesforceFlowProps) {
    super(scope, id);

    // AppFlowのフロー名
    const appFlowName = `${props.projectName}-${props.envName}-${FLOW_CONFIG.s3Prefix}`;
    // AppFlowのフロー定義
    const flow = new appflow.CfnFlow(this, "SalesforceAccountFlow", {
      flowName: appFlowName,
      destinationFlowConfigList: [
        {
          connectorType: "S3",
          destinationConnectorProperties: {
            s3: {
              bucketName: props.outDataBucket.bucketName,
              s3OutputFormatConfig: {
                fileType: "PARQUET",
                aggregationConfig: {
                  aggregationType: "None",
                },
                prefixConfig: {
                  prefixType: "PATH",
                  prefixFormat: "DAY",
                },
                preserveSourceDataTyping: true,
              },
            },
          },
        },
      ],
      sourceFlowConfig: {
        connectorType: "Salesforce",
        connectorProfileName: props.salesforceConnectorProfile,
        sourceConnectorProperties: {
          salesforce: {
            object: FLOW_CONFIG.objectName,
            // 新しく追加されたフィールドを自動的にインポート
            enableDynamicFieldUpdate: false,
            includeDeletedRecords: true,
          },
        },
        incrementalPullConfig: {
          datetimeTypeFieldName: "LastModifiedDate",
        },
      },
      tasks: [
        {
          taskType: "Map_all",
          sourceFields: [],
          taskProperties: [],
        },
      ],
      triggerConfig: {
        triggerType: "Scheduled",
        triggerProperties: {
          scheduleStartTime: Math.floor(
            new Date(FLOW_CONFIG.schedule.startTime).getTime() / 1000
          ),
          scheduleExpression: FLOW_CONFIG.schedule.expression,
          timeZone: "Asia/Tokyo",
          scheduleOffset: FLOW_CONFIG.schedule.offset,
          dataPullMode: "Incremental",
        },
      },
      flowStatus: FLOW_CONFIG.flowStatus,
    });

    // AppFlow Failure EventBridgeルールの作成
    const appFlowFailureRule = new events.Rule(this, "AppFlowFailureRule", {
      eventPattern: {
        source: ["aws.appflow"],
        detailType: ["AppFlow End Flow Run Report"],
        detail: {
          "flow-name": [appFlowName],
          status: ["Execution Failed"],
        },
      },
    });
    // EventBridgeルールのターゲットとしてSNSトピックを設定
    appFlowFailureRule.addTarget(
      new targets.SnsTopic(props.errorNotificationTopic, {
        message: events.RuleTargetInput.fromText(
          `AppFlow execution failed for flow: ${appFlowName}. Please check the AWS AppFlow console for more details.`
        ),
      })
    );
  }
}

salesforce-account-flow.tsでは、Salesforceのオブジェクト連携が増えて、AppFlowを追加する必要があることを想定し、FLOW_CONFIGに設定項目をまとめています。AppFlowの設定としては、CDKでのデプロイ時にstartTimeで過去日を設定しているとエラーとなるため、未来日を設定します。includeDeletedRecordsの値をtrueとすることで削除したレコードが存在する場合は、isdeletedカラムがtrueのレコードが連携されます。datetimeTypeFieldNameで増分転送用のカラムをLastModifiedDateに設定しています。taskTypeMap_allを選択することで抽出カラムが全カラムとなります。

lib/salesforce-appflow-stack.ts
import * as cdk from "aws-cdk-lib";
import type * as athena from "aws-cdk-lib/aws-athena";
import * as s3 from "aws-cdk-lib/aws-s3";
import * as sns from "aws-cdk-lib/aws-sns";
import type { Construct } from "constructs";
import { SalesforceAccountFlow } from "./salesforce-account-flow";

export interface AppFlowStackProps extends cdk.StackProps {
	envName: string;
	projectName: string;
}

export class AppFlowStack extends cdk.Stack {
	public readonly outDataBucket: s3.IBucket;
	public readonly errorNotificationTopic: sns.Topic;
	public readonly salesforceConnectorProfile: string;
	public readonly athenaWorkgroup: athena.CfnWorkGroup;

	constructor(scope: Construct, id: string, props: AppFlowStackProps) {
		super(scope, id, props);

		// 共通リソースの作成
		this.outDataBucket = this.createS3Bucket(props);
		this.errorNotificationTopic = this.createErrorNotificationTopic(props);
		this.salesforceConnectorProfile = `${props.projectName}-${props.envName}-salesforce-flow-connector`;

		// Salesforceオブジェクトごとのフロー作成
		new SalesforceAccountFlow(this, "AccountFlow", {
			envName: props.envName,
			projectName: props.projectName,
			outDataBucket: this.outDataBucket,
			salesforceConnectorProfile: this.salesforceConnectorProfile,
			errorNotificationTopic: this.errorNotificationTopic,
		});
	}

	private createS3Bucket(props: AppFlowStackProps): s3.IBucket {
		return s3.Bucket.fromBucketName(this, "OutDataBucket", `<your-s3-bucket>`);
	}

	private createErrorNotificationTopic(props: AppFlowStackProps): sns.Topic {
		return new sns.Topic(this, "ErrorNotificationTopic", {
			topicName: `${props.projectName}-${props.envName}-error-notification-topic`,
		});
	}
}

salesforce-appflow-stack.tsでは共通で使用するリソースとSalesforceオブジェクトごとのAppFlowを定義しています。

parameter.ts
import { Environment } from "aws-cdk-lib";

// Parameters for Application
export interface AppParameter {
  env: Environment;
  envName: string;
  projectName: string;
}

// Example
export const devParameter: AppParameter = {
  envName: "dev",
  projectName: "cm-kasama",
  env: {},
  // env: { account: "xxxxxx", region: "ap-northeast-1" },
};

export const prodParameter: AppParameter = {
  envName: "prod",
  projectName: "cm-kasama",
  env: {},
  // env: { account: "xxxxxx", region: "ap-northeast-1" },
};

parameter.tsでは環境ごとに活用するparameterを定義しています。

デプロイ

S3 デプロイ

S3はyamlファイルで定義しているのでCloudFormationでデプロイします。AWS Management Console上のCloudFormationを選択し、スタックの作成からテンプレートファイルをアップロードでyamlファイルをアップロードし、スタックを作成します。

Databse, table作成

Athena上でDDLを実行し、tableを作成します。

DDL
CREATE DATABASE IF NOT EXISTS cm_kasama_salesforce_dev_db;
CREATE EXTERNAL TABLE cm_kasama_salesforce_dev_db.raw_account(
  Id STRING,
  IsDeleted BOOLEAN,
  MasterRecordId STRING,
  Name STRING,
  Type STRING,
  ParentId STRING,
  BillingStreet STRING,
  BillingCity STRING,
  BillingState STRING,
  BillingPostalCode STRING,
  BillingCountry STRING,
  BillingLatitude DOUBLE,
  BillingLongitude DOUBLE,
  BillingGeocodeAccuracy STRING,
  BillingAddress STRING,
  ShippingStreet STRING,
  ShippingCity STRING,
  ShippingState STRING,
  ShippingPostalCode STRING,
  ShippingCountry STRING,
  ShippingLatitude DOUBLE,
  ShippingLongitude DOUBLE,
  ShippingGeocodeAccuracy STRING,
  ShippingAddress STRING,
  Phone STRING,
  Fax STRING,
  AccountNumber STRING,
  Website STRING,
  PhotoUrl STRING,
  Sic STRING,
  Industry STRING,
  AnnualRevenue STRING,
  NumberOfEmployees INT,
  Ownership STRING,
  TickerSymbol STRING,
  Description STRING,
  Rating STRING,
  Site STRING,
  OwnerId STRING,
  CreatedDate TIMESTAMP,
  CreatedById STRING,
  LastModifiedDate TIMESTAMP,
  LastModifiedById STRING,
  SystemModstamp TIMESTAMP,
  LastActivityDate TIMESTAMP,
  LastViewedDate TIMESTAMP,
  LastReferencedDate TIMESTAMP,
  Jigsaw STRING,
  JigsawCompanyId STRING,
  CleanStatus STRING,
  AccountSource STRING,
  DunsNumber STRING,
  Tradestyle STRING,
  NaicsCode STRING,
  NaicsDesc STRING,
  YearStarted STRING,
  SicDesc STRING,
  DandbCompanyId STRING,
  OperatingHoursId STRING,
  CustomerPriority__c STRING,
  SLA__c STRING,
  Active__c STRING,
  NumberofLocations__c DOUBLE,
  UpsellOpportunity__c STRING,
  SLASerialNumber__c STRING,
  SLAExpirationDate__c TIMESTAMP
)
PARTITIONED BY (
    year string,
    month string,
    day string
)
STORED AS PARQUET
LOCATION 's3://<your-s3-bucket>/cm-kasama-dev-sf-account-flow/'
TBLPROPERTIES (
    'projection.enabled' = 'true',
    'projection.year.type' = 'date',
    'projection.year.format' = 'yyyy',
    'projection.year.range' = '2024,NOW',
    'projection.month.type' = 'integer',
    'projection.month.range' = '1,12',
    'projection.month.digits' = '2',
    'projection.day.type' = 'integer',
    'projection.day.range' = '1,31',
    'projection.day.digits' = '2',
    'storage.location.template' = 's3://<your-s3-bucket>/cm-kasama-dev-sf-account-flow/${year}/${month}/${day}'
);

CDK デプロイ

package.jsonがあるディレクトリで依存関係をインストールします。

npm install

同じくcdk.jsonがあるディレクトリでデプロイコマンドを実行します。--allはCDKアプリケーションに含まれる全てのスタックをデプロイするためのオプション、--require-approval neverはセキュリティ的に敏感な変更やIAMリソースの変更を含むデプロイメント時の承認を求めるダイアログ表示を完全にスキップします。neverは、どんな変更でも事前確認なしにデプロイすることを意味します。今回は検証用なので指定していますが、慎重にデプロイする場合は必要のないオプションになるかもしれません。-cenvironmentを指定し、環境に合わせたデプロイを行います。

npx cdk deploy --all --require-approval never -c environment=dev --profile <YOUR_AWS_PROFILE>

実行

正常系

デプロイされたAppFlowをオンデマンドモードに修正し手動で実行しました。成功で終了しました。
Screenshot 2025-01-05 at 22.20.59

Athena上でもtableに対してSelectしたところ15件出力されていました。
Screenshot 2025-01-05 at 22.24.12

異常系

次に異常系を確認します。
最初に作成されたSNS TOPICに対してメールアドレスでサブスクリプション登録します。
Screenshot 2025-01-06 at 8.42.16

AppFlowで異常を発生させるためにS3 Bucket policyを手動で削除します。
Screenshot 2025-01-06 at 8.43.05

この状態でAppFlow実行し、想定通りエラーとなりました。
Screenshot 2025-01-06 at 8.46.37
EventBridgeトリガーでSNS TOPICから通知も来ています。
Screenshot 2025-01-06 at 8.46.54

Amazon AppFlowの注意事項

実装や試験を進める中で何点かつまづいたポイントがあったので、記載しておきます。

増分転送時の初回連携分は30日間のデータとなる

AWSの公式サイトにも記載がありますが、増分転送モードの場合、最初のスケジュールによってトリガーされたフローは、過去 30 日間のレコードを取得します。

When you select incremental transfer, Amazon AppFlow transfers only the records that have been added or changed since the last successful flow run. You can also select a source timestamp field to specify how Amazon AppFlow identifies new or changed records. For example, if you have a Created Date timestamp field, choose this to instruct Amazon AppFlow to transfer only newly-created records (and not changed records) since the last successful flow run. The first schedule-triggered flow will pull 30 days of past records at the time of the first flow run.

https://docs.aws.amazon.com/appflow/latest/userguide/flow-triggers.html

試しに初回を増分転送にして、スケジュール実行します。
Screenshot 2025-01-06 at 7.51.56
最終更新日が30日以内のデータは1件です。
Screenshot 2025-01-04 at 19.54.27

AppFlowは成功しましたが、処理されたレコード件数は1件です。
Screenshot 2025-01-06 at 7.55.43
Athenaでクエリも実行しましたが、データとしては1件のみのため、やはり30日間のデータしか取得できないようでした。そのため、初回連携時に全件を取得したい場合は、スケジュール実行の完全転送モードで実行することが良いと思います。
Screenshot 2025-01-06 at 7.56.09

AppFlowで作成されるpartitionはUTC

試しに分でのpartition設定としてオンデマンド実行します。
Screenshot 2025-01-06 at 8.01.22
AppFlowはJSTの2025年1月6日 08:01に実行しました。
Screenshot 2025-01-06 at 8.04.20
S3 URIを確認すると2025年1月5日 23:01となっていますのでUTCとなります。
Screenshot 2025-01-06 at 8.04.09

削除レコードの扱い

AppFlowのその他の設定で削除されたレコードのインポートにチェックが入っていると削除レコードも増分として転送されます。
Screenshot 2025-01-06 at 8.16.21
試しにSalesforce上でレコードを1件削除します。
Screenshot 2025-01-06 at 8.13.50
Salesforceではレコードを削除すると15日以内はごみ箱に物理削除待ちレコードとして登録されます。15日を経過するとそのデータは自動的に完全に削除(物理削除)されます。
Screenshot 2025-01-06 at 8.15.03

https://sustainalead.com/trash/#:~:text=削除された項目は,レコード」という観点があります。

AppFlowを実行し、出力されたデータをAthenaで参照しました。isdeletedカラムがtrueで連携されています。このカラムを用いて削除されているか否かを判断できます。
Screenshot 2025-01-06 at 8.23.19

AppFlowのSalesforce連携ではスケジュール実行は1分あたり1回

AWS公式サイトにも記載がありますが、AppFlowのSalesforce連携では1分間に最大1回のフロー実行となります。二つ同時に実行してみたところ、片方はスケジュール時間通りに動作し、片方は2,3分遅れて動作しました。取り扱うデータ量によって時間差があると思いますのであくまで参考程度にしていただければと思います。実際にプロジェクトで導入する際は、どのくらい時間差があるか、差分データが過不足なく取得できているかの検証が必要だと思います。

Salesforce: Maximum frequency of one flow run per minute

https://docs.aws.amazon.com/appflow/latest/userguide/service-quotas.html

最後に

AppFlowで簡単にデータロードはできるので、複雑な加工要件が無ければ一つの選択肢として良いと思いました!

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.